package com.changgou.canal.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.canal.config.RabbitMQConfig;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

/**
 * @Author ZW
 * @DATE 2020/7/22 19:26
 * @Description  监听Business微服务
 **/
@CanalEventListener  //声明该类是Canal的监听类
public class BusinessListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
        eventType：当前操作数据库的类型
        rowData：当前操作数据库的数据
     */
    @ListenPoint(schema = "changgou_business",table = "tb_ad")
    public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
        //获取操作数据库前的数据
//        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
//        for (CanalEntry.Column column : beforeColumnsList) {
//            System.out.println("更新前的数据"+ column.getName() + ":" + column.getValue());
//        }

        //获取操作数据库后的数据
//        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
//        for (CanalEntry.Column column : afterColumnsList) {
//            System.out.println("更新后的数据"+ column.getName() + ":" + column.getValue());
//        }
        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
        for (CanalEntry.Column column : afterColumnsList) {
            if (column.getName().equals("position")){
                System.out.println("发送最新的消息到MQ:"+column.getValue());

                //发送消息
                rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column.getValue());
            }
        }
    }
}